/* Copyright (c) 2022 渝州大数据实验室
 *
 * Lanius is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *     http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */
package org.yzbdl.lanius.orchestrate.serv.quartz.taskplan;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.yzbdl.lanius.orchestrate.common.dto.resource.ServerProgramInfoDTO;
import org.yzbdl.lanius.orchestrate.common.dto.task.TaskPlanResourceDTO;
import org.yzbdl.lanius.orchestrate.common.entity.resource.TaskResourceConfigEntity;
import org.yzbdl.lanius.orchestrate.common.entity.task.TaskInstance;
import org.yzbdl.lanius.orchestrate.common.enums.ResourceDataStatusEnum;
import org.yzbdl.lanius.orchestrate.common.enums.TaskInstanceStatusEnum;
import org.yzbdl.lanius.orchestrate.common.enums.TaskStatusEnum;
import org.yzbdl.lanius.orchestrate.serv.constant.EtlRequestConstant;
import org.yzbdl.lanius.orchestrate.serv.constant.TaskPlanSchedulerConstant;
import org.yzbdl.lanius.orchestrate.serv.quartz.config.BaseTaskJob;
import org.yzbdl.lanius.orchestrate.serv.quartz.handler.category.ProgramCategoryExecutorFactory;
import org.yzbdl.lanius.orchestrate.serv.quartz.handler.category.ProgramCategoryExecutorInvokerHandler;
import org.yzbdl.lanius.orchestrate.serv.service.resource.ServerProgramService;
import org.yzbdl.lanius.orchestrate.serv.service.task.TaskInstanceService;
import org.yzbdl.lanius.orchestrate.serv.service.task.TaskPlanService;
import org.yzbdl.lanius.orchestrate.serv.utils.CommonUtil;
import org.yzbdl.lanius.orchestrate.serv.utils.DateUtil;
import org.yzbdl.lanius.orchestrate.serv.utils.TaskInstanceLogUtil;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 任务编排定时任务
 *
 * @author jinchunzhao@yzbdl.ac.cn
 * @date 2022-04-11 15:58
 */
@Slf4j
@Component
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public class TaskPlanQuartzTaskJob implements BaseTaskJob {

    @Autowired
    private ThreadPoolExecutor executor;

    @Autowired
    private TaskInstanceService taskInstanceService;

    @Autowired
    private TaskPlanService taskPlanService;

    @Autowired
    private ServerProgramService serverProgramService;

    @Override
    public void execute(JobExecutionContext context) {
        // 执行任务编排计划任务
        JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
        callTaskPlanTask(jobDataMap);
    }

    /**
     * 执行任务编排
     *
     * @param jobDataMap
     *        参数
     */
    private void callTaskPlanTask(JobDataMap jobDataMap) {
        String currentDate = DateUtil.format(new Date(), DateUtil.DATE_FORMAT_SECOND);
        log.info("开始执行定时任务  {}", currentDate);

        Long nodeId = jobDataMap.getLong(TaskPlanSchedulerConstant.TASK_PLAN_NODE_ID);
        Long taskPlanId = jobDataMap.getLong(TaskPlanSchedulerConstant.TASK_PLAN_ID);
        boolean async = jobDataMap.getBoolean(TaskPlanSchedulerConstant.TASK_PLAN_IS_ASYNC);
        boolean isIncrementalLog = jobDataMap.getBoolean(TaskPlanSchedulerConstant.TASK_PLAN_INCR_LOG);
        long taskPlanGroupId = jobDataMap.getLong(TaskPlanSchedulerConstant.TASK_PLAN_GROUP_ID);

        TaskPlanResourceDTO taskPlanResourceDTO = taskPlanService.getTaskPlanAndResourceInfoIgnoreTenantId(taskPlanId);
        Integer logLevel = taskPlanResourceDTO.getLogLevel();

        Long taskInstanceId = null;
        if (Objects.isNull(taskPlanResourceDTO)) {
            log.warn("执行任务编排调度任务：未查询到任务编排数据！");
            taskInstanceId = skipTaskInstance(nodeId,-1L, taskPlanId, -1L,logLevel);

            String msg = String.format("%s - 执行任务编排调度任务：未查询到任务编排数据！\r\n", currentDate);
            TaskInstanceLogUtil.writerLog(-1L, taskPlanGroupId, taskPlanId, taskInstanceId, msg);
            return;
        }

        taskPlanGroupId = taskPlanResourceDTO.getGroupId();
        Long serverProgramId = taskPlanResourceDTO.getServerProgramId();
        Long taskResourceId = taskPlanResourceDTO.getTaskResourceId();
        String taskName = taskPlanResourceDTO.getTaskName();

        // 检验任务编排状态
        Integer taskPlanStatus = taskPlanResourceDTO.getStatus();
        if (Objects.equals(taskPlanStatus, TaskStatusEnum.DISABLED.getCode())) {
            log.warn("任务编排：【{}】已被禁用，不能执行任务!", taskName);
            taskInstanceId = skipTaskInstance(serverProgramId,taskResourceId, taskPlanId, taskPlanResourceDTO.getOrgId(),logLevel);

            String msg = String.format("%s - 任务编排：【%s】已被禁用，不能执行任务!\r\n", currentDate,taskName);
            TaskInstanceLogUtil.writerLog(taskPlanResourceDTO.getOrgId(), taskPlanGroupId, taskPlanId, taskInstanceId,
                msg);
            return;
        }
        // 检验任务资源配置状态
        TaskResourceConfigEntity taskResourceConfigEntity = taskPlanResourceDTO.getTaskResourceConfigEntity();
        Integer status = taskResourceConfigEntity.getStatus();
        if (Objects.equals(status, ResourceDataStatusEnum.ABNORMAL.getCode())) {
            log.warn("任务编排：【{}】关联任务资源配置异常，不能执行任务!", taskName);
            taskInstanceId = skipTaskInstance(serverProgramId,taskResourceId, taskPlanId, taskPlanResourceDTO.getOrgId(),logLevel);

            String msg = String.format("%s - 任务编排：【%s】关联任务资源配置异常，不能执行任务!\r\n", currentDate,taskName);
            TaskInstanceLogUtil.writerLog(taskPlanResourceDTO.getOrgId(), taskPlanGroupId, taskPlanId, taskInstanceId,
                msg);
            return;
        }

        // 1. 检测是否可以并行执行任务
        if (!isParallel(jobDataMap)) {
            log.warn("任务编排：【{}】存在运行中的实例，不能并行执行任务!", taskName);
            // 跳过该计划
            taskInstanceId = skipTaskInstance(serverProgramId,taskResourceId, taskPlanId, taskPlanResourceDTO.getOrgId(),logLevel);

            String msg = String.format("%s - 任务编排：【%s】存在运行中的实例，不能并行执行任务!\r\n", currentDate, taskName);
            TaskInstanceLogUtil.writerLog(taskPlanResourceDTO.getOrgId(), taskPlanGroupId, taskPlanId, taskInstanceId,
                    msg);
            return;
        }

        // 初始化任务实例信息
        TaskInstance taskInstance = initTaskInstance(taskPlanResourceDTO);

        // 2. 发起任务执行请求
        if (async) {
            // 异步执行
            CompletableFuture.runAsync(
                () -> executeTaskPlanJob(serverProgramId, taskInstance, taskPlanResourceDTO, isIncrementalLog), executor);
        } else {
            executeTaskPlanJob(serverProgramId, taskInstance, taskPlanResourceDTO, isIncrementalLog);
        }
    }

    /**
     * 执行任务编排调度任务
     *
     * @param nodeId
     *        服务节点ID
     * @param taskInstance
     *        任务实例
     * @param taskPlanResourceDTO
     *        任务编排详细
     * @param isIncrementalLog
     *        增量日志
     */
    private void executeTaskPlanJob(Long nodeId, TaskInstance taskInstance, TaskPlanResourceDTO taskPlanResourceDTO,
        Boolean isIncrementalLog) {
        String currentDate = DateUtil.format(new Date(), DateUtil.DATE_FORMAT_SECOND);
        ServerProgramInfoDTO serverProgram = serverProgramService.getServerProgramInfoByIdIgnoreTenantId(nodeId);

        Long taskPlanGroupId = taskPlanResourceDTO.getGroupId();
        Long orgId = taskPlanResourceDTO.getOrgId();
        Long taskPlanId = taskPlanResourceDTO.getId();
        if (Objects.isNull(serverProgram)) {
            log.warn("执行任务编排调度任务：未查询到服务节点数据！--调度结束");
            updateTaskInstanceStatus(taskInstance.getId(), TaskInstanceStatusEnum.SKIP.getCode());

            String msg = String.format("%s - 执行任务编排调度任务【%s】：未查询到服务节点数据！--调度结束\r\n", currentDate, taskPlanResourceDTO.getTaskName());
            TaskInstanceLogUtil.writerLog(orgId, taskPlanGroupId, taskPlanId, taskInstance.getId(),
                    msg);
            return;
        }

        Integer programCategory = serverProgram.getCategory();
        if (Objects.isNull(programCategory)) {
            log.warn("执行任务编排调度任务：未查询到服务节点程序类型数据！--调度结束");
            updateTaskInstanceStatus(taskInstance.getId(), TaskInstanceStatusEnum.SKIP.getCode());

            String msg = String.format("%s - 执行任务编排调度任务【%s】：未查询到服务节点程序类型数据！--调度结束\r\n", currentDate, taskPlanResourceDTO.getTaskName());
            TaskInstanceLogUtil.writerLog(orgId, taskPlanGroupId, taskPlanId, taskInstance.getId(),
                    msg);
            return;
        }
        ProgramCategoryExecutorInvokerHandler handler =
            ProgramCategoryExecutorFactory.getHandler(programCategory.toString());
        if (Objects.isNull(handler)) {
            log.warn("未找到对应的服务节点程序类型处理器");
            updateTaskInstanceStatus(taskInstance.getId(), TaskInstanceStatusEnum.SKIP.getCode());

            String msg = String.format("%s - 执行任务编排调度任务【%s】：未找到对应的服务节点程序类型处理器！--调度结束\r\n", currentDate, taskPlanResourceDTO.getTaskName());
            TaskInstanceLogUtil.writerLog(orgId, taskPlanGroupId, taskPlanId, taskInstance.getId(),
                    msg);
            return;
        }
        handler.executorTaskPlanJob(taskInstance, taskPlanResourceDTO, serverProgram, isIncrementalLog);

    }

    /**
     * 初始化任务实例信息
     *
     * @param taskPlanResourceDTO
     *        任务编排
     * @return 任务实例
     */
    private TaskInstance initTaskInstance(TaskPlanResourceDTO taskPlanResourceDTO) {

        Long taskPlanId = taskPlanResourceDTO.getId();
        Long groupId = taskPlanResourceDTO.getGroupId();
        Long orgId = taskPlanResourceDTO.getOrgId();

        Long serverProgramId = taskPlanResourceDTO.getServerProgramId();
        Long taskResourceId = taskPlanResourceDTO.getTaskResourceId();

        TaskInstance taskInstance = new TaskInstance();
        taskInstance.setTaskPlanId(taskPlanId);
        taskInstance.setServerProgramId(serverProgramId);
        taskInstance.setTaskResourceId(taskResourceId);
        taskInstance.setBeginTime(new Date());
        taskInstance.setStatus(TaskInstanceStatusEnum.BE_INIT.getCode());
        Map<String, Object> map = new HashMap<>(10);
        map.put(EtlRequestConstant.CARTE_OBJ_ID, UUID.randomUUID().toString());
        taskInstance.setTaskInfo(JSONUtil.toJsonStr(map));

        taskInstance.setOrgId(taskPlanResourceDTO.getOrgId());
        taskInstance.setLogLevel(taskPlanResourceDTO.getLogLevel());
        taskInstanceService.save(taskInstance);

        String currentDate = DateUtil.format(new Date(), DateUtil.DATE_FORMAT_SECOND);
        String msg = String.format("%s - 开始执行任务调度！\r\n", currentDate);
        Long taskInstanceId = taskInstance.getId();
        TaskInstanceLogUtil.writerLog(orgId, groupId, taskPlanId, taskInstanceId, msg);
        return taskInstance;
    }

    /**
     * 跳过该任务计划
     *
     * @param nodeId
     *        服务节点ID
     * @param taskResourceId
     *        任务资源ID
     * @param taskPlanId
     *        任务编排ID
     * @param orgId
     *        组织ID
     * @param logLevel
     *        日志级别
     * @return 实例id
     */
    private Long skipTaskInstance(Long nodeId,Long taskResourceId, Long taskPlanId, Long orgId,Integer logLevel) {
        Date date = new Date();
        TaskInstance taskInstance = new TaskInstance();
        taskInstance.setTaskPlanId(taskPlanId);
        taskInstance.setServerProgramId(nodeId);
        taskInstance.setBeginTime(date);
        taskInstance.setEndTime(date);
        taskInstance.setStatus(TaskInstanceStatusEnum.SKIP.getCode());
        taskInstance.setLogLevel(logLevel);
        Map<String, Object> map = new HashMap<>(10);
        map.put(EtlRequestConstant.CARTE_OBJ_ID, UUID.randomUUID().toString());
        taskInstance.setTaskInfo(JSONUtil.toJsonStr(map));
        taskInstance.setTaskResourceId(taskResourceId);
        taskInstance.setOrgId(orgId);
        taskInstanceService.save(taskInstance);
        return taskInstance.getId();
    }

    /**
     * 检测任务是否有实例状态未结束，且任务不能并行执行（跳过）
     *
     * @param taskPlanId
     *        任务编排ID
     * @return
     *        false：可以执行， true: 不可以执行
     */
    private boolean isCanRun(Long taskPlanId) {
        Boolean run = taskInstanceService.checkInstanceForRun(taskPlanId);
        return !run;
    }

    /**
     * 是否并行执行任务
     *
     * @param jobDataMap
     *        调度系统传递参数
     * @return
     *        true/false
     */
    private boolean isParallel(JobDataMap jobDataMap) {
        boolean isParallel = jobDataMap.getBoolean(TaskPlanSchedulerConstant.TASK_PLAN_IS_PARALLEL);
        Long taskPlanId = jobDataMap.getLong(TaskPlanSchedulerConstant.TASK_PLAN_ID);

        if (!isParallel) {
            return isCanRun(taskPlanId);
        }
        return true;
    }

    /**
     * 更新任务实例状态
     *
     * @param taskInstanceId
     *        任务实例ID
     * @param status
     *        状态编码
     */
    private void updateTaskInstanceStatus(Long taskInstanceId, Integer status) {
        List<Integer> runStateList = CommonUtil.getRunTaskInstanceStatus();
        boolean taskEnd = !runStateList.contains(status);

        TaskInstance taskInstance = new TaskInstance();
        taskInstance.setId(taskInstanceId);
        taskInstance.setStatus(status);
        // 结束设置结束时间
        if (taskEnd) {
            taskInstance.setEndTime(new Date());
        }
        taskInstanceService.updateByIdIgnoreTenantId(taskInstance);
    }
}
